Flink作业提交(三) 您所在的位置:网站首页 flink 提交job Flink作业提交(三)

Flink作业提交(三)

2023-03-27 21:56| 来源: 网络整理| 查看: 265

源码分析JobMaster如何run起来 介绍到了JobMaster.start方法,这个方法主要是启动rpc服务,并且运行job,接下来看下怎么run job?本文内容是基于Flink 1.9来讲解。

1. 首先看下JobMaster.start方法源码 /** * Start the rpc service and begin to run the job. * * @param newJobMasterId The necessary fencing token to run the job * @return Future acknowledge if the job could be started. Otherwise the future contains an exception */ public CompletableFuture start(final JobMasterId newJobMasterId) throws Exception { // make sure we receive RPC and async calls start(); return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT); } start():JM/TM之间的通信都是通过rpc进行的,这一步是启动JM rpc server服务 startJobExecution:异步调用该方法,开始执行job。下面介绍该方法 2. 接下来看JobMaster.startJobExecution方法 //-- job starting and stopping ----------------------------------------------------------------- private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { validateRunsInMainThread(); checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); if (Objects.equals(getFencingToken(), newJobMasterId)) { log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); return Acknowledge.get(); } setNewFencingToken(newJobMasterId); startJobMasterServices(); log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId); resetAndStartScheduler(); return Acknowledge.get(); }

这里有两个重要的方法 startJobMasterServices(); 和 resetAndStartScheduler();   这两个方法里涉及到的东西都比较多,分开来介绍。

2.1 开始分析 startJobMasterServices() 方法

2.1.1 startHeartbeatServices();

开启TM心跳定时任务 开启RM心跳定时任务

2.1.2 slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());

背景知识:SlotPool是干什么的? SlotPool是为当前作业的slot请求服务的。 它会缓存一个可用slots map(availableSlots),如果有slot请求,首先会先去check availableSlots是否可以满足需求,如果不可以,会向ResourceManager申请新slot(slot使用生成的AllocationID唯一标识)。如果说ResourceManager不可用,或者请求超时,SlotPool会把请求置为fail。 如果ResourceManager挂掉,SlotPool也可以提供空闲的slot进行分配。 释放不再使用的slot,比如:作业已经完全run起来了还有一些空闲slot。 slotPool.start做了哪些工作? 检查空闲的slot,释放过期的slot。   - 释放过期slot的过程:      - 如果slot上没有task,直接把TaskSlotState标记为free。      - 如果slot不为空,也就是有task,那先把TaskSlotState标记为releasing,并且fail上面的tasks。然后TM通知RM该slot可以置为free,RM的slotManager对象会把slot状态置为free。   - slot过期时间问题      - 默认值:50s      - 可以通过设置slot.idle.timeout来自己调整 checkBatchSlotTimeout,这块东西没有细看

2.1.3 scheduler.start(getMainThreadExecutor());

背景知识:scheduler是干什么的? Scheduler会把task分配到slot scheduler.start做了哪些工作? 把当前JM main thread executor赋值给componentMainThreadExecutor对象。

2.1.4 reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

如果存在RM连接,都close掉 把JM注册到RM,对应ResourceManager.registerJobManager方法 // ------------------------------------------------------------------------ // RPC methods // ------------------------------------------------------------------------ @Override public CompletableFuture registerJobManager( final JobMasterId jobMasterId, final ResourceID jobManagerResourceId, final String jobManagerAddress, final JobID jobId, final Time timeout) { checkNotNull(jobMasterId); checkNotNull(jobManagerResourceId); checkNotNull(jobManagerAddress); checkNotNull(jobId); if (!jobLeaderIdService.containsJob(jobId)) { try { jobLeaderIdService.addJob(jobId); } catch (Exception e) { ResourceManagerException exception = new ResourceManagerException("Could not add the job " + jobId + " to the job id leader service.", e); onFatalError(exception); log.error("Could not add job {} to job leader id service.", jobId, e); return FutureUtils.completedExceptionally(exception); } } log.info("Registering job manager {}@{} for job {}.", jobMasterId, jobManagerAddress, jobId); CompletableFuture jobMasterIdFuture; try { jobMasterIdFuture = jobLeaderIdService.getLeaderId(jobId); } catch (Exception e) { // we cannot check the job leader id so let's fail // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + "job leader id future to verify the correct job leader.", e); onFatalError(exception); log.debug("Could not obtain the job leader id future to verify the correct job leader."); return FutureUtils.completedExceptionally(exception); } CompletableFuture jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class); CompletableFuture registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync( jobMasterIdFuture, (JobMasterGateway jobMasterGateway, JobMasterId leadingJobMasterId) -> { if (Objects.equals(leadingJobMasterId, jobMasterId)) { return registerJobMasterInternal( jobMasterGateway, jobId, jobManagerAddress, jobManagerResourceId); } else { final String declineMessage = String.format( "The leading JobMaster id %s did not match the received JobMaster id %s. " + "This indicates that a JobMaster leader change has happened.", leadingJobMasterId, jobMasterId); log.debug(declineMessage); return new RegistrationResponse.Decline(declineMessage); } }, getMainThreadExecutor()); // handle exceptions which might have occurred in one of the futures inputs of combine return registrationResponseFuture.handleAsync( (RegistrationResponse registrationResponse, Throwable throwable) -> { if (throwable != null) { if (log.isDebugEnabled()) { log.debug("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress, throwable); } else { log.info("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress); } return new RegistrationResponse.Decline(throwable.getMessage()); } else { return registrationResponse; } }, getRpcService().getExecutor()); } SlotPool与RM建立连接,这样SlotPool就可以向RM申请资源了

2.1.5 resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

启动RM leader选举服务 2.2 开始分析 resetAndStartScheduler() 方法

该方法主要处理作业调度相关的工作,包括申请slot以及对Execution进行deploy。首先看下该方法源码

private void resetAndStartScheduler() throws Exception { validateRunsInMainThread(); final CompletableFuture schedulerAssignedFuture; if (schedulerNG.requestJobStatus() == JobStatus.CREATED) { schedulerAssignedFuture = CompletableFuture.completedFuture(null); schedulerNG.setMainThreadExecutor(getMainThreadExecutor()); } else { suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled.")); final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup); schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle( (ignored, throwable) -> { newScheduler.setMainThreadExecutor(getMainThreadExecutor()); assignScheduler(newScheduler, newJobManagerJobMetricGroup); return null; } ); } schedulerAssignedFuture.thenRun(this::startScheduling); }

ExecutionGraph在构建的时候,通过ExecutionGraph成员变量列表可以看到,JobStatus默认是CREATED状态。因此resetAndStartScheduler方法首先走了if逻辑,然后是调用startScheduling,接下来看startScheduling方法逻辑。

@Override public void startScheduling() { mainThreadExecutor.assertRunningInMainThread(); try { executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); } }

会调用executionGraph.scheduleForExecution() --> SchedulingUtils.scheduleEager 重点看下SchedulingUtils.scheduleEager,这个方法主要做了两件事情

为每个 ExecutionVertex 申请slot,并把ExecutionVertex的Execution状态从CREATED变为SCHEDULED deploy 所有的 Execution

2.2.1 为每个 ExecutionVertex 申请slot 首先上源码,该源码在SchedulingUtils#scheduleEager方法中

// collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList allAllocationFutures = new ArrayList(); final SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy(); final Set allPreviousAllocationIds = Collections.unmodifiableSet( computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider())); // allocate the slots (obtain all their futures) for (ExecutionVertex ev : vertices) { // these calls are not blocking, they only return futures CompletableFuture allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution( slotProviderStrategy, LocationPreferenceConstraint.ALL, allPreviousAllocationIds); allAllocationFutures.add(allocationFuture); }

底层真正申请slot的源码在SchedulerImpl#allocateSingleSlot方法中

private CompletableFuture allocateSingleSlot( SlotRequestId slotRequestId, SlotProfile slotProfile, boolean allowQueuedScheduling, @Nullable Time allocationTimeout) { Optional slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile); if (slotAndLocality.isPresent()) { // already successful from available try { return CompletableFuture.completedFuture( completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get())); } catch (FlinkException e) { return FutureUtils.completedExceptionally(e); } } else if (allowQueuedScheduling) { // we allocate by requesting a new slot return requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout) .thenApply((PhysicalSlot allocatedSlot) -> { try { return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN)); } catch (FlinkException e) { throw new CompletionException(e); } }); } else { // failed to allocate return FutureUtils.completedExceptionally( new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.')); } }

slot申请流程总结如下:

SlotPoolImpl会保留一个availableSlots map,首先会先去查找availableSlots是否可以满足slot申请条件 如果availableSlots没有可用slot,那会向RM申请资源   - flink1.9之后都是按需申请资源,如果作业执行需要的slot没有得到满足,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager

2.2.2 deploy 所有的 Execution 当所有的ExecutionVertex节点申请到slot之后,就开始进行部署,首先看下源码,该源码在SchedulingUtils#scheduleEager方法中

// this future is complete once all slot futures are complete. // the future fails once one slot future fails. final ConjunctFuture allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); return allAllocationsFuture.thenAccept( (Collection executionsToDeploy) -> { for (Execution execution : executionsToDeploy) { try { execution.deploy(); } catch (Throwable t) { throw new CompletionException( new FlinkException( String.format("Could not deploy execution %s.", execution), t)); } } })

deploy方法主要做了下面几件事情

把ExecutionVertex节点这次执行对应的Execution,状态从SCHEDULED切换到DEPLOYING,如果状态切换失败,直接release 这个 slot 往TM上异步提交task    1. 生成对任务的描述 TaskDeploymentDescription       - 输入的描述 InputGateDeploymentDescriptor       - 输出的描述 ResultPartitionDeploymentDescriptor    2. 重新从 blob store 加载JobInformation和TaskInformation    3. 构建Task对象       - 在构造函数中会进行一些初始化工作,比较重要的是根据 InputGateDeploymentDescriptors 创建 InputGates,根据 ResultPartitionDeploymentDescriptors 创建 ResultPartitions       - Task是在一个TM上单并发subtask的一次执行       - Task是对Flink operator的封装       - Task提供了所有的必要服务,比如消费输入数据,输出结果以及和JM进行通信等       - Flink operator只有数据reader和writers,以及某些callback事件。task会把它们和network栈以及actor message进行连接,还可以追踪execution的状态以及处理exception       - 某个Task并不知道自己与其他task的关系,这些信息都在JM中;Task只知道自己需要执行的代码,task配置信息,以及消费或者生产的intermediate results 的IDs    4. 执行Task,task实现了Runnable接口,其实就是启动task线程。源码可以看Task#doRun方法       - 切换Task的状态,从CREATED变成DEPLOYING       - 在blob cache中注册job       - 获取user-code classloader,下载job的jar包或者class的时候会用到       - 反序列化拿到 ExecutionConfig,ExecutionConfig 中包含所有算子相关的信息,比如ExecutionMode,并发度等       - 向网络栈注册Task。会逐一注册所有的ResultPartitions和InputGates,并分配bufferPool。在注册InputGate的时候,会为每一个channel都请求对应的子分区。       - 切换Task的状态,从DEPLOYING变成RUNNING       - 通过反射调用task

至此,作业已经运行起来了

小结

JobMaster启动作业,主要分成两个步骤

启动作业运行依赖的服务,比如TM/RM心跳监控,slotPool,scheduler等 为所有的ExecutionVertex分配slot,并且deploy JM,TM 申请流程,入口是JobMaster#start方法    3.1 首先向RM申请JobMaster,并向RM注册    3.2 然后执行JobMaster#resetAndStartScheduler方法的时候,会先去为每个 ExecutionVertex 申请slot。如果availableSlots可以满足需求,就使用availableSlots;如果availableSlots不能满足需求,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager。


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有